package cloud.uke.jds.api;

import static cloud.uke.jds.api.DataSyncStat.PAUSED;
import static cloud.uke.jds.api.DataSyncStat.RUNNING;
import static cloud.uke.jds.api.DataSyncStat.STOPPED;

import cloud.uke.jds.api.internal.InternalWrapSyncData;
import java.util.concurrent.locks.LockSupport;
import lombok.extern.slf4j.Slf4j;

/**
 * 数据同步主程序
 *
 * @author du
 */
@Slf4j
public class DataSync<T, ID> {

  private final DataSyncBuilder<T, ID> builder;
  private DataSyncStat stat;
  private Thread taskThread;
  /** 为null 代表消费成功 */
  private InternalWrapSyncData<T, ID> lastData;

  DataSync(DataSyncBuilder<T, ID> builder) {
    this.builder = builder;
  }

  /**
   * builder.
   *
   * @param <T> a T class
   * @param <ID> a ID class
   * @return a {@link cloud.uke.jds.api.DataSyncBuilder} object
   */
  public static <T, ID> DataSyncBuilder<T, ID> builder() {
    return new DataSyncBuilder<>();
  }

  /** 开始任务 */
  public synchronized void start() {
    if (stat == null) {
      stat = DataSyncStat.INIT;
    }
    switch (stat) {
      case INIT:
        init();
        break;
      default:
        checkReady();
    }
    stat = RUNNING;
    LockSupport.unpark(taskThread);
  }

  private void init() {

    // 准备工作
    builder.internalStoreDataHandler.prepare();

    taskThread = new Thread(this::runTask);
    taskThread.setName("dataSync-" + builder.srcId + "-" + builder.targetId);
    taskThread.start();

    stat = DataSyncStat.INIT;
  }

  private void runTask() {
    while (true) {
      switch (stat) {
          // 初始化和暂停状态等待
        case INIT:
        case PAUSED:
          log.info("task stat not running, wait");
          LockSupport.park();
          continue;
        case RUNNING:
          log.debug("task is running");

          if (lastData == null) {
            try {
              lastData = builder.internalStoreDataHandler.poll();
            } catch (Exception e) {
              log.error(e.getMessage(), e);
              waitTime();
              continue;
            }
          }

          if (lastData == null) {
            log.debug("poll data is null, sleep waitMills {}", builder.waitMills);
            LockSupport.parkNanos(builder.waitMills * 1000 * 1000);
            continue;
          }

          handleData(lastData);

          break;
        case STOPPED:
          log.info("task stopped");
          return;
        default:
      }
    }
  }

  private void handleData(InternalWrapSyncData<T, ID> data) {
    if (log.isDebugEnabled()) {
      log.debug("handle user data: {}", data);
    }
    try {
      SyncResult syncResult =
          builder.userDataHandler.doSync(
              data.getLastSyncId(), data.getSrc(), data.getTarget(), data.getData());
      if (syncResult == null) {
        log.warn(
            "user data handler returned RETRY, sleep waitMills {} and retry", builder.waitMills);
        waitTime();
        return;
      }

      if (syncResult == SyncResult.NEXT) {
        // 处理成功，重置最后的消息状态
        resetLastData();
        builder.internalStoreDataHandler.success(data);
        log.debug("user data handler returned NEXT, continue handle next record");
        return;
      }

      log.warn("user data handler returned RETRY, sleep waitMills {} and retry", builder.waitMills);
      waitTime();
    } catch (Exception e) {
      log.warn(
          "handle user data occurred error: {}, sleep waitMills {} and retry",
          e.getMessage(),
          builder.waitMills);
      waitTime();
    }
  }

  private void resetLastData() {
    lastData = null;
  }

  private void waitTime() {
    LockSupport.parkNanos(builder.waitMills * 1000 * 1000);
  }

  /** 暂停同步任务 */
  public synchronized void pause() {
    checkReady();
    stat = DataSyncStat.PAUSED;
  }

  /**
   * 危险！！！
   *
   * <p>清除同步数据和同步状态
   */
  public synchronized void veryDangerOperationCleanAllData() {
    checkReady();

    builder.internalStoreDataHandler.veryDangerOperationCleanAllData();
  }

  /**
   * 灌入新的 要同步的数据
   *
   * <p>同步操作
   *
   * @param t 数据
   */
  public synchronized void putData(T t) {
    checkReady();
    builder.internalStoreDataHandler.save(t);

    // 唤醒线程，处理新的数据
    if (stat == RUNNING) {
      LockSupport.unpark(taskThread);
    }
  }

  private void checkReady() {
    boolean ready = (stat == RUNNING) || (stat == PAUSED);
    if (!ready) {
      throw new IllegalStateException("sync not ready (RUNNING or PAUSED)");
    }
  }

  /** 停止同步任务，销毁资源 */
  public synchronized void stop() {
    stat = STOPPED;
    builder.internalStoreDataHandler.stop();
  }

  /** 重置同步任务状态（删除同步标记） */
  public synchronized void reset() {
    checkReady();
    builder.internalStoreDataHandler.reset();
  }
}
